Execution Pools¶
Thread pool¶
- Required files:
test_plan.py¶
#!/usr/bin/env python
"""
This example is to demonstrate parallel test execution in a thread pool.
"""
import sys
from testplan import test_plan
from testplan import Task
from testplan.parser import TestplanParser
from testplan.runners.pools.base import Pool as ThreadPool
from testplan.report.testing.styles import Style, StyleEnum
OUTPUT_STYLE = Style(StyleEnum.ASSERTION_DETAIL, StyleEnum.ASSERTION_DETAIL)
class CustomParser(TestplanParser):
"""Inheriting base parser."""
def add_arguments(self, parser):
"""Defining custom arguments for this Testplan."""
parser.add_argument(
"--tasks-num",
action="store",
type=int,
default=8,
help="Number of tests to be scheduled.",
)
parser.add_argument(
"--pool-size",
action="store",
type=int,
default=4,
help="How many thread workers assigned to pool.",
)
# Using a custom parser to support `--tasks-num` and `--pool-size` command
# line arguments so that users can experiment with thread pool test execution.
# Hard-coding `pdf_path`, 'stdout_style' and 'pdf_style' so that the
# downloadable example gives meaningful and presentable output.
# NOTE: this programmatic arguments passing approach will cause Testplan
# to ignore any command line arguments related to that functionality.
@test_plan(
name="ThreadPoolExecution",
parser=CustomParser,
pdf_path="report.pdf",
stdout_style=OUTPUT_STYLE,
pdf_style=OUTPUT_STYLE,
)
def main(plan):
"""
Testplan decorated main function to add and execute MultiTests.
:return: Testplan result object.
:rtype: ``testplan.base.TestplanResult``
"""
# Add a thread pool test execution resource to the plan of given size.
pool = ThreadPool(name="MyPool", size=plan.args.pool_size)
plan.add_resource(pool)
# Add a given number of similar tests to the thread pool
# to be executed in parallel.
for idx in range(plan.args.tasks_num):
task = Task(
target="make_multitest", module="tasks", kwargs={"index": idx}
)
plan.schedule(task, resource="MyPool")
if __name__ == "__main__":
res = main()
print("Exiting code: {}".format(res.exit_code))
sys.exit(res.exit_code)
tasks.py¶
"""TCP connections tests to be executed in parallel in a thread pool."""
import threading
import time
from testplan.testing.multitest import MultiTest, testsuite, testcase
from testplan.common.utils.context import context
from testplan.testing.multitest.driver.tcp import TCPServer, TCPClient
def after_start(env):
"""
Called right after MultiTest starts.
"""
# Server accepts connection request made by client.
env.server.accept_connection()
@testsuite
class TCPTestsuite:
"""TCP communication tests."""
def __init__(self):
self._thread_id = threading.current_thread().name
@testcase
def send_and_receive_msg(self, env, result):
"""
Server client communication with a sleep in the middle that
represents processing time by the server before respond.
"""
# Client sends a message.
msg = env.client.cfg.name
result.log(
"Client on thread {} is sending: {}".format(self._thread_id, msg)
)
bytes_sent = env.client.send_text(msg)
received = env.server.receive_text(size=bytes_sent)
result.equal(received, msg, "Server received")
start_time = time.time()
# Sleeping here to represent a time consuming processing
# of the message received by the server before replying back.
time.sleep(1)
result.log(
"Server was processing message for {}s".format(
round(time.time() - start_time, 1)
)
)
response = "Hello {}".format(received)
result.log(
"Server on thread {} is responding: {}".format(
self._thread_id, response
)
)
# Server sends the reply.
bytes_sent = env.server.send_text(response)
received = env.client.receive_text(size=bytes_sent)
result.equal(received, response, "Client received")
def make_multitest(index=0):
"""
Creates a new MultiTest that runs TCP connection tests.
This will be created inside a thread worker.
"""
print(
"Creating a MultiTest on {}.".format(threading.current_thread().name)
)
test = MultiTest(
name="TCPMultiTest_{}".format(index),
suites=[TCPTestsuite()],
environment=[
TCPServer(name="server"),
TCPClient(
name="client",
host=context("server", "{{host}}"),
port=context("server", "{{port}}"),
),
],
after_start=after_start,
)
return test
Process pool¶
- Required files:
test_plan.py¶
#!/usr/bin/env python
"""
This example is to demonstrate parallel test execution in a process pool.
"""
import sys
from testplan import test_plan
from testplan import Task
from testplan.runners.pools.process import ProcessPool
from testplan.parser import TestplanParser
from testplan.report.testing.styles import Style, StyleEnum
OUTPUT_STYLE = Style(StyleEnum.ASSERTION_DETAIL, StyleEnum.ASSERTION_DETAIL)
class CustomParser(TestplanParser):
"""Inheriting base parser."""
def add_arguments(self, parser):
"""Defining custom arguments for this Testplan."""
parser.add_argument("--tasks-num", action="store", type=int, default=8)
parser.add_argument("--pool-size", action="store", type=int, default=4)
# Using a custom parser to support `--tasks-num` and `--pool-size` command
# line arguments so that users can experiment with process pool test execution.
# Hard-coding `pdf_path`, 'stdout_style' and 'pdf_style' so that the
# downloadable example gives meaningful and presentable output.
# NOTE: this programmatic arguments passing approach will cause Testplan
# to ignore any command line arguments related to that functionality.
@test_plan(
name="ProcessPoolExecution",
parser=CustomParser,
pdf_path="report.pdf",
stdout_style=OUTPUT_STYLE,
pdf_style=OUTPUT_STYLE,
)
def main(plan):
"""
Testplan decorated main function to add and execute MultiTests.
:return: Testplan result object.
:rtype: ``testplan.base.TestplanResult``
"""
# Add a process pool test execution resource to the plan of given size.
pool = ProcessPool(name="MyPool", size=plan.args.pool_size)
plan.add_resource(pool)
# Add a given number of similar tests to the process pool
# to be executed in parallel.
for idx in range(plan.args.tasks_num):
# All Task arguments need to be serializable.
task = Task(
target="make_multitest", module="tasks", kwargs={"index": idx}
)
plan.schedule(task, resource="MyPool")
if __name__ == "__main__":
res = main()
print("Exiting code: {}".format(res.exit_code))
sys.exit(res.exit_code)
tasks.py¶
"""TCP connections tests to be executed in parallel in a process pool."""
import os
import time
from testplan.testing.multitest import MultiTest, testsuite, testcase
from testplan.common.utils.context import context
from testplan.testing.multitest.driver.tcp import TCPServer, TCPClient
def after_start(env):
"""
Called right after MultiTest starts.
"""
# Server accepts connection request made by client.
env.server.accept_connection()
@testsuite
class TCPTestsuite:
"""TCP communication tests."""
def __init__(self):
self._process_id = os.getpid()
@testcase
def send_and_receive_msg(self, env, result):
"""
Server client communication with a sleep in the middle that
represents processing time by the server before respond.
"""
# Client sends a message.
msg = env.client.cfg.name
result.log(
"Client with process id {} is sending: {}".format(
self._process_id, msg
)
)
bytes_sent = env.client.send_text(msg)
received = env.server.receive_text(size=bytes_sent)
result.equal(received, msg, "Server received")
start_time = time.time()
# Sleeping here to represent a time consuming processing
# of the message received by the server before replying back.
time.sleep(1)
result.log(
"Server was processing message for {}s".format(
round(time.time() - start_time, 1)
)
)
response = "Hello {}".format(received)
result.log(
"Server with process id {} is responding: {}".format(
self._process_id, response
)
)
# Server sends the reply.
bytes_sent = env.server.send_text(response)
received = env.client.receive_text(size=bytes_sent)
result.equal(received, response, "Client received")
def make_multitest(index=0):
"""
Creates a new MultiTest that runs TCP connection tests.
This will be created inside a process worker.
"""
test = MultiTest(
name="TCPMultiTest_{}".format(index),
suites=[TCPTestsuite()],
environment=[
TCPServer(name="server"),
TCPClient(
name="client",
host=context("server", "{{host}}"),
port=context("server", "{{port}}"),
),
],
after_start=after_start,
)
return test
Remote pool¶
- Required files:
test_plan.py¶
#!/usr/bin/env python
# This plan contains tests that demonstrate failures as well.
"""
Parallel test execution in a remote pool.
"""
import os
import sys
import getpass
import shutil
import tempfile
# Check if the remote host has been specified in the environment. Remote
# hosts can only be Linux systems.
REMOTE_HOST = os.environ.get("TESTPLAN_REMOTE_HOST")
if not REMOTE_HOST:
raise RuntimeError(
"You must specify a remote Linux host via the TESTPLAN_REMOTE_HOST "
"environment var to run this example."
)
from testplan import test_plan
from testplan import Task
from testplan.runners.pools.remote import RemotePool
from testplan.common.utils.path import pwd
from testplan.parser import TestplanParser
from testplan.report.testing.styles import Style, StyleEnum
OUTPUT_STYLE = Style(StyleEnum.ASSERTION_DETAIL, StyleEnum.ASSERTION_DETAIL)
TEMP_DIR = None
class CustomParser(TestplanParser):
"""Inheriting base parser."""
def add_arguments(self, parser):
"""Defining custom arguments for this Testplan."""
parser.add_argument("--tasks-num", action="store", type=int, default=8)
parser.add_argument("--pool-size", action="store", type=int, default=4)
# Function that creates a file with some content
# to demonstrate custom file transferring.
def make_file(filename, dirname, content):
path = os.path.join(dirname, filename)
with open(path, "w") as fobj:
fobj.write(content)
return path
# Using a custom parser to support `--tasks-num` and `--pool-size` command
# line arguments so that users can experiment with remote pool test execution.
# Hard-coding `pdf_path`, 'stdout_style' and 'pdf_style' so that the
# downloadable example gives meaningful and presentable output.
# NOTE: this programmatic arguments passing approach will cause Testplan
# to ignore any command line arguments related to that functionality.
@test_plan(
name="RemotePoolExecution",
parser=CustomParser,
pdf_path=os.path.join(pwd(), "report.pdf"),
stdout_style=OUTPUT_STYLE,
pdf_style=OUTPUT_STYLE,
)
def main(plan):
"""
Testplan decorated main function to add and execute MultiTests.
:return: Testplan result object.
:rtype: ``testplan.base.TestplanResult``
"""
workspace = os.path.dirname(__file__)
# Create two temporary files locally. For demonstration, just write the
# filename as the content of each.
assert TEMP_DIR is not None
for filename in ("file1", "file2"):
make_file(filename, TEMP_DIR, content=filename)
# Explicitly specify the full paths to both the local source files just
# created and the destination filepaths on the remote host.
push_files = [
(os.path.join(TEMP_DIR, "file1"), "/tmp/remote_example/file1"),
(os.path.join(TEMP_DIR, "file2"), "/tmp/remote_example/file2"),
]
# Add a remote pool test execution resource to the plan of given size.
pool = RemotePool(
name="MyPool",
# Create 3 workers on the same remote host.
hosts={REMOTE_HOST: 3},
# Allow the remote port to be overridden by the
# environment. Default to 0, which will make testplan use
# the default SSH port for connections.
port=int(os.environ.get("TESTPLAN_REMOTE_PORT", 0)),
setup_script=["/bin/bash", "setup_script.ksh"],
env={"LOCAL_USER": getpass.getuser(), "LOCAL_WORKSPACE": workspace},
workspace_exclude=[".git/", ".cache/", "doc/", "test/"],
# We push local files to the remote worker using the
# explicit source and destination locations defined above.
push=push_files,
workspace=workspace,
clean_remote=True,
)
plan.add_resource(pool)
# Add a given number of similar tests to the remote pool
# to be executed in parallel.
for idx in range(plan.args.tasks_num):
# All Task arguments need to be serializable.
task = Task(
target="make_multitest",
module="tasks",
# We specify the full paths to files as they will be found
# on the remote host.
kwargs={
"index": idx,
"files": [
"/tmp/remote_example/file1",
"/tmp/remote_example/file2",
],
},
)
plan.schedule(task, resource="MyPool")
if __name__ == "__main__":
# Create a new temporary directory for this test plan.
TEMP_DIR = tempfile.mkdtemp()
# Run the test plan.
res = main()
# Clean up all the temporary files used by this test plan.
shutil.rmtree(TEMP_DIR)
print("Exiting code: {}".format(res.exit_code))
sys.exit(res.exit_code)
tasks.py¶
"""TCP connections tests to be executed in parallel in a remote pool."""
import os
import time
from testplan.testing.multitest import MultiTest, testsuite, testcase
from testplan.common.utils.context import context
from testplan.testing.multitest.driver.tcp import TCPServer, TCPClient
def after_start(env):
"""
Called right after MultiTest starts.
"""
# Server accepts connection request made by client.
env.server.accept_connection()
@testsuite
class TCPTestsuite:
"""TCP communication tests."""
def __init__(self, files):
self._process_id = os.getpid()
self._files = files
def setup(self, env, result):
result.log("LOCAL_USER: {}".format(os.environ["LOCAL_USER"]))
for _file in self._files:
with open(_file) as fobj:
result.log("Source file contents: {}".format(fobj.read()))
@testcase
def send_and_receive_msg(self, env, result):
"""
Server client communication with a sleep in the middle that
represents processing time by the server before respond.
"""
# Client sends a message.
msg = env.client.cfg.name
result.log(
"Client with process id {} is sending: {}".format(
self._process_id, msg
)
)
bytes_sent = env.client.send_text(msg)
received = env.server.receive_text(size=bytes_sent)
result.equal(received, msg, "Server received")
start_time = time.time()
# Sleeping here to represent a time consuming processing
# of the message received by the server before replying back.
time.sleep(1)
result.log(
"Server was processing message for {}s".format(
round(time.time() - start_time, 1)
)
)
response = "Hello {}".format(received)
result.log(
"Server with process id {} is responding: {}".format(
self._process_id, response
)
)
# Server sends the reply.
bytes_sent = env.server.send_text(response)
received = env.client.receive_text(size=bytes_sent)
result.equal(received, response, "Client received")
def make_multitest(index=0, files=None):
"""
Creates a new MultiTest that runs TCP connection tests.
This will be created inside a remote worker.
"""
test = MultiTest(
name="TCPMultiTest_{}".format(index),
suites=[TCPTestsuite(files)],
environment=[
TCPServer(name="server"),
TCPClient(
name="client",
host=context("server", "{{host}}"),
port=context("server", "{{port}}"),
),
],
after_start=after_start,
)
return test
setup_script.ksh¶
#!/bin/bash
echo 'Executing setup commands.'
echo 'Environment:'
env
echo 'User:'
echo $USER
echo 'Hostname:'
hostname
# Make a soft link in remote host to make the local workspace
# absolute path available for hardcoded entries.
if [ ! -d $TESTPLAN_LOCAL_WORKSPACE ];
then
mkdir -p `dirname $TESTPLAN_LOCAL_WORKSPACE`
ln -s $TESTPLAN_REMOTE_WORKSPACE $TESTPLAN_LOCAL_WORKSPACE
else
echo 'Local workspace is visible from the remote!'
fi
echo 'Finished setup commands.'
exit 0
Task Rerun¶
- Required files:
test_plan.py¶
#!/usr/bin/env python
"""
This example is to demonstrate task level rerun feature in a pool.
"""
import os
import sys
import uuid
import getpass
import tempfile
from testplan import test_plan
from testplan import Task
from testplan.runners.pools.base import Pool as ThreadPool
from testplan.parser import TestplanParser
from testplan.report.testing.styles import Style, StyleEnum
OUTPUT_STYLE = Style(StyleEnum.ASSERTION_DETAIL, StyleEnum.ASSERTION_DETAIL)
class CustomParser(TestplanParser):
"""Inheriting base parser."""
def add_arguments(self, parser):
"""Defining custom arguments for this Testplan."""
parser.add_argument("--pool-size", action="store", type=int, default=4)
# Using a custom parser to support `--tasks-num` and `--pool-size` command
# line arguments so that users can experiment with process pool test execution.
# Hard-coding `pdf_path`, 'stdout_style' and 'pdf_style' so that the
# downloadable example gives meaningful and presentable output.
# NOTE: this programmatic arguments passing approach will cause Testplan
# to ignore any command line arguments related to that functionality.
@test_plan(
name="PoolExecutionAndTaskRerun",
parser=CustomParser,
pdf_path="report.pdf",
stdout_style=OUTPUT_STYLE,
pdf_style=OUTPUT_STYLE,
)
def main(plan):
"""
Testplan decorated main function to add and execute MultiTests.
:return: Testplan result object.
:rtype: ``testplan.base.TestplanResult``
"""
# Add a thread pool test execution resource to the plan of given size.
# Can also use a process pool instead.
pool = ThreadPool(name="MyPool", size=plan.args.pool_size)
plan.add_resource(pool)
# Add a task with `rerun` argument to the thread pool
tmp_file = os.path.join(
tempfile.gettempdir(), getpass.getuser(), "{}.tmp".format(uuid.uuid4())
)
task = Task(
target="make_multitest", module="tasks", args=(tmp_file,), rerun=2
)
plan.schedule(task, resource="MyPool")
if __name__ == "__main__":
res = main()
print("Exiting code: {}".format(res.exit_code))
sys.exit(res.exit_code)
tasks.py¶
"""An unstable tests to be executed until pass."""
import os
from testplan.testing.multitest import MultiTest, testsuite, testcase
from testplan.common.utils.path import makedirs
@testsuite
class Unstablesuite:
"""
A test suite which has an unstable testcase.
The multitest containing this suite has to re-run twice
(3 times in total) in order to get passed.
"""
def __init__(self, tmp_file):
self._iteration = None
self._max_rerun = 2
self._tmp_file = tmp_file
def setup(self, env, result):
"""
Create a tmp text file which record how many times the suite
has been executed, should remove it after the last rerun.
"""
makedirs(os.path.dirname(self._tmp_file))
if not os.path.exists(self._tmp_file):
self._iteration = 0
else:
with open(self._tmp_file, "r") as fp:
self._iteration = int(fp.read())
if self._iteration == self._max_rerun:
os.remove(self._tmp_file)
else:
with open(self._tmp_file, "w") as fp:
fp.write(str(self._iteration + 1))
result.log("Suite setup in iteration {}".format(self._iteration))
@testcase
def unstable_testcase(self, env, result):
"""
An unstable testcase which can only pass at 3rd run (2nd rerun).
"""
if self._iteration == 2:
result.log("Test passes")
else:
result.fail("Test fails")
def make_multitest(tmp_file):
"""
Creates a new MultiTest that runs unstable tests.
"""
return MultiTest(
name="UnstableMultiTest",
suites=[Unstablesuite(tmp_file=tmp_file)],
environment=[],
)
MultiTest parts scheduling¶
- Required files:
test_plan.py¶
#!/usr/bin/env python
"""
This example is to demonstrate parallel test execution with parts in a pool.
"""
import sys
from testplan import test_plan
from testplan import Task
from testplan.parser import TestplanParser
from testplan.runners.pools.base import Pool as ThreadPool
from testplan.report.testing.styles import Style, StyleEnum
OUTPUT_STYLE = Style(StyleEnum.ASSERTION_DETAIL, StyleEnum.ASSERTION_DETAIL)
class CustomParser(TestplanParser):
"""Inheriting base parser."""
def add_arguments(self, parser):
"""Defining custom arguments for this Testplan."""
parser.add_argument(
"--parts-num",
action="store",
type=int,
default=3,
help="Number of parts to be split.",
)
parser.add_argument(
"--pool-size",
action="store",
type=int,
default=3,
help="How many thread workers assigned to pool.",
)
# Using a custom parser to support `--tasks-num` and `--pool-size` command
# line arguments so that users can experiment with thread pool test execution.
# Hard-coding `pdf_path`, 'stdout_style' and 'pdf_style' so that the
# downloadable example gives meaningful and presentable output.
# NOTE: this programmatic arguments passing approach will cause Testplan
# to ignore any command line arguments related to that functionality.
@test_plan(
name="MultiTestPartsExecution",
parser=CustomParser,
pdf_path="report.pdf",
stdout_style=OUTPUT_STYLE,
pdf_style=OUTPUT_STYLE,
merge_scheduled_parts=False,
)
def main(plan):
"""
Testplan decorated main function to add and execute MultiTests.
:return: Testplan result object.
:rtype: ``testplan.base.TestplanResult``
"""
# Add a thread pool test execution resource to the plan of given size.
# Also you can use process pool or remote pool instead.
pool = ThreadPool(name="MyPool", size=plan.args.pool_size)
plan.add_resource(pool)
# Add a given number of similar tests to the thread pool
# to be executed in parallel.
for idx in range(plan.args.parts_num):
task = Task(
target="make_multitest",
module="tasks",
kwargs={"part_tuple": (idx, plan.args.parts_num)},
)
plan.schedule(task, resource="MyPool")
if __name__ == "__main__":
res = main()
print("Exiting code: {}".format(res.exit_code))
sys.exit(res.exit_code)
tasks.py¶
"""Tests to be executed in multi-parts and task will be schedule in a pool."""
from testplan.testing.multitest import MultiTest, testsuite, testcase
@testsuite
class Suite1:
"""A test suite with several normal testcases."""
@testcase
def test_equal(self, env, result):
result.equal("foo", "foo", description="Equality example")
@testcase
def test_not_equal(self, env, result):
result.not_equal("foo", "bar", description="Inequality example")
@testcase
def test_less(self, env, result):
result.less(2, 12, description="Less comparison example")
@testcase
def test_greater(self, env, result):
result.greater(10, 5, description="Greater comparison example")
@testcase
def test_approximate_equal(self, env, result):
result.isclose(
100,
101,
rel_tol=0.01,
abs_tol=0.0,
description="Approximate equality example",
)
@testsuite
class Suite2:
"""A test suite with parameterized testcases."""
@testcase(parameters=tuple(range(6)))
def test_bool(self, env, result, val):
if val > 0:
result.true(val, description="Check if value is true")
else:
result.false(val, description="Check if value is false")
def make_multitest(part_tuple=None):
test = MultiTest(
name="MultitestParts", suites=[Suite1(), Suite2()], part=part_tuple
)
return test
Task discover¶
This example requires a file structure demonstrated as below. In this example, @task_target annotated functions defined under sub-projects will be discovered when plan.schedule_all is called, without requiring user to specify target/module/path separately for each task.
|~sub_proj1/
| |-__init__.py
| |-suites.py
| `-tasks.py
|~sub_proj2/
| |-__init__.py
| |-suites.py
| `-tasks.py
`-test_plan.py*
- Required files:
test_plan.py¶
#!/usr/bin/env python
"""
Discover and schedule tasks that spread across the project for parallel execution in Pool.
"""
import sys
from testplan import test_plan
from testplan.runners.pools.process import ProcessPool
from testplan.report.testing.styles import Style, StyleEnum
OUTPUT_STYLE = Style(StyleEnum.ASSERTION_DETAIL, StyleEnum.ASSERTION_DETAIL)
# Hard-coding `pdf_path`, 'stdout_style' and 'pdf_style' so that the
# downloadable example gives meaningful and presentable output.
@test_plan(
name="TaskDiscovery",
pdf_path="report.pdf",
stdout_style=OUTPUT_STYLE,
pdf_style=OUTPUT_STYLE,
merge_scheduled_parts=True,
)
def main(plan):
"""
Testplan decorated main function to add and execute MultiTests.
:return: Testplan result object.
:rtype: ``testplan.base.TestplanResult``
"""
# Add a process pool test execution resource to the plan of given size.
# Also you can use thread pool or remote pool instead.
pool = ProcessPool(name="MyPool")
plan.add_resource(pool)
# Create task objects from all @task_target we could find in the modules
# that matches the name pattern under the specified path, and schedule them
# to MyPool.
plan.schedule_all(
path=".", name_pattern=r".*tasks\.py$", resource="MyPool"
)
if __name__ == "__main__":
res = main()
assert len(res.report.entries) == 5
print("Exiting code: {}".format(res.exit_code))
sys.exit(res.exit_code)
sub_proj1/tasks.py¶
"""A discoverable task target that will be instanciated to multiple task objects"""
from testplan.runners.pools.tasks.base import task_target
from testplan.testing.multitest import MultiTest
# need to use full path specification of the module
import sub_proj1.suites
# each entry in parameters will be used to create one task object
@task_target(
parameters=(
# positional args to be passed to target, as a tuple or list
("Proj1-Suite2", None, [sub_proj1.suites.Suite2]),
# keyword args to be passed to target, as a dict
dict(
name="Proj1-Suite1",
part_tuple=(0, 2),
suites=[sub_proj1.suites.Suite1],
),
dict(
name="Proj1-Suite1",
part_tuple=(1, 2),
suites=[sub_proj1.suites.Suite1],
),
),
# additional args of Task class
rerun=1,
weight=1,
)
def make_multitest(name, part_tuple=None, suites=None):
# a test target shall only return 1 runnable object
test = MultiTest(
name=name, suites=[cls() for cls in suites], part=part_tuple
)
return test
# an alternative way of specifying parts for multitest
@task_target(
parameters=(
dict(
name="Proj1-Suite1-Again",
suites=[sub_proj1.suites.Suite1],
),
),
# instruct testplan to split each multitest task into parts
multitest_parts=2,
# additional args of Task class
rerun=1,
weight=1,
)
def make_multitest1(name, suites=None):
# a test target shall only return 1 runnable object
test = MultiTest(name=name, suites=[cls() for cls in suites])
return test
sub_proj2/tasks.py¶
"""Discoverable task targets that will be instantiated to task objects"""
from testplan.runners.pools.tasks.base import task_target
from testplan.testing.multitest import MultiTest
from sub_proj2.suites import Suite1, Suite2
@task_target
def make_multitest1():
# a test target shall only return 1 runnable object
test = MultiTest(name="Proj2-Suite1", suites=[Suite1()])
return test
@task_target
def make_multitest2():
# a test target shall only return 1 runnable object
test = MultiTest(name="Proj2-Suite2", suites=[Suite2()])
return test
Auto-Part¶
This example requires command line argument --runtime-data runtime_data.txt
.
- Required files:
test_plan.py¶
#!/usr/bin/env python
"""
This example is to demonstrate auto-part feature in a process pool.
"""
import sys
from testplan import test_plan
from testplan.runners.pools.process import ProcessPool
from testplan.report.testing.styles import Style, StyleEnum
OUTPUT_STYLE = Style(StyleEnum.ASSERTION_DETAIL, StyleEnum.ASSERTION_DETAIL)
# The auto_part_runtime_limit argument instructs testplan to split parts="auto"
# Multitest into optimal number of parts so that the runtime of each part
# is not more than the limit.
@test_plan(
name="AutoPartExample",
pdf_path="report.pdf",
stdout_style=OUTPUT_STYLE,
pdf_style=OUTPUT_STYLE,
auto_part_runtime_limit=100,
plan_runtime_target=100,
)
def main(plan):
"""
Testplan decorated main function to add and execute MultiTests.
:return: Testplan result object.
:rtype: ``testplan.base.TestplanResult``
"""
# Enable smart-schedule pool size
pool = ProcessPool(name="MyPool", size="auto")
# Add a process pool test execution resource to the plan of given size.
plan.add_resource(pool)
# Discover tasks and calculate the right size of the pool based on the weight (runtime) of the
# tasks so that runtime of all tasks meets the plan_runtime_target.
plan.schedule_all(
path=".",
name_pattern=r".*task\.py$",
resource="MyPool",
)
if __name__ == "__main__":
res = main()
print("Exiting code: {}".format(res.exit_code))
sys.exit(res.exit_code)
runtime_data.txt¶
{
"Proj1-suite": {
"execution_time": 199.99,
"setup_time": 5
}
}
sub_task.py¶
from testplan.runners.pools.tasks.base import task_target
from testplan.testing.multitest import MultiTest, testsuite, testcase
from testplan.testing.multitest.driver import Driver
@testsuite
class Suite1:
"""A test suite with several normal testcases."""
@testcase
def test1(self, env, result):
result.equal("foo", "foo", description="Equality example")
@testcase
def test2(self, env, result):
result.not_equal("foo", "bar", description="Inequality example")
@testcase
def test3(self, env, result):
result.less(2, 12, description="Less comparison example")
@testcase
def test4(self, env, result):
result.greater(10, 5, description="Greater comparison example")
@testsuite
class Suite2:
"""A test suite with parameterized testcases."""
@testcase(parameters=tuple(range(6)))
def test_bool(self, env, result, val):
if val > 0:
result.true(val, description="Check if value is true")
else:
result.false(val, description="Check if value is false")
@task_target(multitest_parts="auto")
def make_auto_part_multitest1():
return MultiTest(
name="Proj1-suite",
suites=[Suite1(), Suite2()],
environment=[Driver("NoopDriver")],
)